/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.util.threads;

import org.apache.tomcat.util.res.StringManager;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
 * If a RejectedExecutionHandler is not specified a default one will be configured
 * and that one will always throw a RejectedExecutionException
 *
 * @author fhanik
 */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
	/**
	 * The string manager for this package.
	 */
	protected static final StringManager sm = StringManager
			.getManager("org.apache.tomcat.util.threads.res");

	/**
	 * The number of tasks submitted but not yet finished. This includes tasks
	 * in the queue and tasks that have been handed to a worker thread but the
	 * latter did not start executing the task yet.
	 * This number is always greater or equal to {@link #getActiveCount()}.
	 */
	private final AtomicInteger submittedCount = new AtomicInteger(0);
	private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);

	/**
	 * Most recent time in ms when a thread decided to kill itself to avoid
	 * potential memory leaks. Useful to throttle the rate of renewals of
	 * threads.
	 */
	private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);

	/**
	 * Delay in ms between 2 threads being renewed. If negative, do not renew threads.
	 */
	private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;

	public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
		prestartAllCoreThreads();
	}

	public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
	                          RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
		prestartAllCoreThreads();
	}

	public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
		prestartAllCoreThreads();
	}

	public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
		prestartAllCoreThreads();
	}

	public long getThreadRenewalDelay() {
		return threadRenewalDelay;
	}

	public void setThreadRenewalDelay(long threadRenewalDelay) {
		this.threadRenewalDelay = threadRenewalDelay;
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		submittedCount.decrementAndGet();

		if (t == null) {
			stopCurrentThreadIfNeeded();
		}
	}

	/**
	 * If the current thread was started before the last time when a context was
	 * stopped, an exception is thrown so that the current thread is stopped.
	 */
	protected void stopCurrentThreadIfNeeded() {
		if (currentThreadShouldBeStopped()) {
			long lastTime = lastTimeThreadKilledItself.longValue();
			if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
				if (lastTimeThreadKilledItself.compareAndSet(lastTime,
						System.currentTimeMillis() + 1)) {
					// OK, it's really time to dispose of this thread

					final String msg = sm.getString(
							"threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
							Thread.currentThread().getName());

					throw new StopPooledThreadException(msg);
				}
			}
		}
	}

	protected boolean currentThreadShouldBeStopped() {
		if (threadRenewalDelay >= 0
				&& Thread.currentThread() instanceof TaskThread) {
			TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
			if (currentTaskThread.getCreationTime() <
					this.lastContextStoppedTime.longValue()) {
				return true;
			}
		}
		return false;
	}

	public int getSubmittedCount() {
		return submittedCount.get();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public void execute(Runnable command) {
		execute(command, 0, TimeUnit.MILLISECONDS);
	}

	/**
	 * Executes the given command at some time in the future.  The command
	 * may execute in a new thread, in a pooled thread, or in the calling
	 * thread, at the discretion of the <tt>Executor</tt> implementation.
	 * If no threads are available, it will be added to the work queue.
	 * If the work queue is full, the system will wait for the specified
	 * time and it throw a RejectedExecutionException if the queue is still
	 * full after that.
	 *
	 * @param command the runnable task
	 * @throws RejectedExecutionException if this task cannot be
	 *                                    accepted for execution - the queue is full
	 * @throws NullPointerException       if command or unit is null
	 */
	public void execute(Runnable command, long timeout, TimeUnit unit) {
		submittedCount.incrementAndGet();
		try {
			super.execute(command);
		} catch (RejectedExecutionException rx) {
			if (super.getQueue() instanceof TaskQueue) {
				final TaskQueue queue = (TaskQueue) super.getQueue();
				try {
					if (!queue.force(command, timeout, unit)) {
						submittedCount.decrementAndGet();
						throw new RejectedExecutionException("Queue capacity is full.");
					}
				} catch (InterruptedException x) {
					submittedCount.decrementAndGet();
					Thread.interrupted();
					throw new RejectedExecutionException(x);
				}
			} else {
				submittedCount.decrementAndGet();
				throw rx;
			}

		}
	}

	public void contextStopping() {
		this.lastContextStoppedTime.set(System.currentTimeMillis());

		// save the current pool parameters to restore them later
		int savedCorePoolSize = this.getCorePoolSize();
		TaskQueue taskQueue =
				getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
		if (taskQueue != null) {
			// note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
			// checks that queue.remainingCapacity()==0. I did not understand
			// why, but to get the intended effect of waking up idle threads, I
			// temporarily fake this condition.
			taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
		}

		// setCorePoolSize(0) wakes idle threads
		this.setCorePoolSize(0);

		// TaskQueue.take() takes care of timing out, so that we are sure that
		// all threads of the pool are renewed in a limited time, something like
		// (threadKeepAlive + longest request time)

		if (taskQueue != null) {
			// ok, restore the state of the queue and pool
			taskQueue.setForcedRemainingCapacity(null);
		}
		this.setCorePoolSize(savedCorePoolSize);
	}

	private static class RejectHandler implements RejectedExecutionHandler {
		@Override
		public void rejectedExecution(Runnable r,
		                              java.util.concurrent.ThreadPoolExecutor executor) {
			throw new RejectedExecutionException();
		}

	}


}
